bd2ef46b13af6952c0fee445d485e90a024d0a39,src/main/java/com/gameminers/visage/master/VisageMaster.java,VisageMaster,run,#,76
Before Change
conn = factory.newConnection();
channel = conn.createChannel();
Visage.log.finer("Setting up queue '"+queue+"'");
channel.queueDeclare(queue, false, false, true, null);
channel.basicQos(1);
Visage.log.finer("Setting up reply queue");
replyQueue = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueue, consumer);
if (config.getBoolean("slave.enable")) {
Visage.log.info("Starting fallback slave");
fallback = new VisageSlave(config.getConfig("slave").withValue("rabbitmq", config.getValue("rabbitmq")));
fallback.start();
}
Visage.log.info("Starting Jetty");
server.start();
Visage.log.info("Listening for finished jobs");
try {
while (true) {
Delivery delivery = consumer.nextDelivery();
Visage.log.finest("Got delivery");
try {
String corrId = delivery.getProperties().getCorrelationId();
if (queuedJobs.containsKey(corrId)) {
Visage.log.finest("Valid");
responses.put(corrId, delivery.getBody());
Runnable run = queuedJobs.get(corrId);
queuedJobs.remove(corrId);
Visage.log.finest("Removed from queue");
run.run();
Visage.log.finest("Ran runnable");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Visage.log.finest("Ack'd");
} else {
Visage.log.warning("Unknown correlation ID?");
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
After Change
Log.setLog(new LogShim(Visage.log));
long total = Runtime.getRuntime().totalMemory();
long max = Runtime.getRuntime().maxMemory();
if (Visage.debug) Visage.log.finer("Current heap size: "+humanReadableByteCount(total, false));
if (Visage.debug) Visage.log.finer("Max heap size: "+humanReadableByteCount(max, false));
if (total < max) {
Visage.log.warning("You have set your minimum heap size (Xms) lower than the maximum heap size (Xmx) - this can cause GC thrashing. It is strongly recommended to set them both to the same value.");
}
if (max < (1000*1000*1000)) {
Visage.log.warning("The heap size (Xmx) is less than one gigabyte; it is recommended to run Visage with a gigabyte or more. Use -Xms1G and -Xmx1G to do this.");
}
Visage.log.info("Initializing cache");
if (config.getBoolean("cache.enabled")) {
cache = new GuavaBasicCacheManager();
} else {
cache = new JCSCacheManager(config.getConfig("cache"));
}
Visage.log.info("Setting up Jetty");
Server server = new Server(new InetSocketAddress(config.getString("http.bind"), config.getInt("http.port")));
List<String> expose = config.getStringList("expose");
String poweredBy;
if (expose.contains("server")) {
if (expose.contains("version")) {
poweredBy = "Visage v"+Visage.VERSION;
} else {
poweredBy = "Visage";
}
} else {
poweredBy = null;
}
ResourceHandler resource = new ResourceHandler();
resource.setResourceBase(config.getString("http.static"));
resource.setDirectoriesListed(false);
resource.setWelcomeFiles(new String[] {"index.html"});
resource.setHandler(new VisageHandler(this));
if (!"/dev/null".equals(config.getString("log"))) {
server.setRequestLog(new AsyncNCSARequestLog(config.getString("log")));
}
server.setHandler(new HeaderHandler("X-Powered-By", poweredBy, resource));
Visage.log.info("Connecting to RabbitMQ at "+config.getString("rabbitmq.host")+":"+config.getInt("rabbitmq.port"));
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.getString("rabbitmq.host"));
factory.setPort(config.getInt("rabbitmq.port"));
if (config.hasPath("rabbitmq.user")) {
factory.setUsername(config.getString("rabbitmq.user"));
factory.setPassword(config.getString("rabbitmq.password"));
}
String queue = config.getString("rabbitmq.queue");
conn = factory.newConnection();
channel = conn.createChannel();
if (Visage.debug) Visage.log.finer("Setting up queue '"+queue+"'");
channel.queueDeclare(queue, false, false, true, null);
channel.basicQos(1);
if (Visage.debug) Visage.log.finer("Setting up reply queue");
replyQueue = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueue, consumer);
if (config.getBoolean("slave.enable")) {
Visage.log.info("Starting fallback slave");
fallback = new VisageSlave(config.getConfig("slave").withValue("rabbitmq", config.getValue("rabbitmq")));
fallback.start();
}
Visage.log.info("Starting Jetty");
server.start();
Visage.log.info("Listening for finished jobs");
try {
while (true) {
Delivery delivery = consumer.nextDelivery();
if (Visage.trace) Visage.log.finest("Got delivery");
try {
String corrId = delivery.getProperties().getCorrelationId();
if (queuedJobs.containsKey(corrId)) {
if (Visage.trace) Visage.log.finest("Valid");
responses.put(corrId, delivery.getBody());
Runnable run = queuedJobs.get(corrId);
queuedJobs.remove(corrId);
if (Visage.trace) Visage.log.finest("Removed from queue");
run.run();
if (Visage.trace) Visage.log.finest("Ran runnable");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
if (Visage.trace) Visage.log.finest("Ack'd");
} else {
Visage.log.warning("Unknown correlation ID?");
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);